Back to Main Menu

Data Exchange

Example Process for Automating Data Upload

The following Python scripts illustrate how a process can be setup to check an upload folder for new uploads and manage the upload via Data Exchange, including checks for upload errors, using the API endpoint POST /api/v2/DataExchangeJob.

 

  1. """
  2. Example script to automate data exchange (Assetic.DataExchangeProcessNew.py)
  3. First of two scripts
  4. This script uploads to Assetic the files in the 'ToLoad' folder. The
  5. filename is parsed to get the asset category which allows the data exchange
  6. profile to be determined. A data exchange task is created for the document
  7. The name of the file is changed to the task id and moved to the 'InProgress'
  8. folder. This allows the subsequent script to identify the task and check
  9. progress
  10. """
  11. import assetic
  12. import os
  13. import shutil
  14. import sys
  15. import base64
  16. # Assetic SDK instance
  17. asseticsdk = assetic.AsseticSDK("C:/Users/kwilton/assetic.ini",None,"Info")
  18. ##API instances
  19. # Document API
  20. docapi = assetic.DocumentApi()
  21. # Data Exchange API
  22. dataexchangeapi = assetic.DataExchangeJobApi()
  23. # File Processing Directory structure
  24. intdir = "C:/temp/Integration/"
  25. sourcedir = intdir + "ToLoad/"
  26. inprogress = intdir + "InProgress/"
  27. errordir = intdir + "Error/"
  28. processeddir = intdir + "Processed/"
  29. ##preview
  30. profiles = {'Sewer Nodes': 'c7e6d3ed-3917-e611-9458-06edd62954d7',
  31. 'Roads': 'd889d99b-3317-e611-9458-06edd62954d7'}
  32. ##demo
  33. profiles = {'Sewer Nodes': 'c7e6d3ed-3917-e611-9458-06edd62954d7',
  34. 'Roads': '876a0a4e-c732-e611-945f-06edd62954d7',
  35. 'Water Pressure Pipes': '3abd431c-2017-e611-9812-0632cf3be881'}
  36. ##Loop through files in folder
  37. files = os.listdir(sourcedir)
  38. for filename in files:
  39. fullfilename = sourcedir + filename
  40. #get category, assumes file name has a whitespace between date and category
  41. filedate, categorywithext = filename.split(None,1)
  42. category, fileext = os.path.splitext(categorywithext)
  43. asseticsdk.logger.info("File Date: {0}, Category: {1}".format(
  44. filedate, category))
  45. ##read file and encode for upload
  46. with open(fullfilename, "rb") as f:
  47. data = f.read()
  48. if sys.version_info < (3,0):
  49. filecontents = data.encode("base64")
  50. else:
  51. filecontents = base64.b64encode(data)
  52. filecontents = filecontents.decode(encoding="utf-8", errors="strict")
  53. ##create file properties object and set values
  54. file_properties = \
  55. assetic.Assetic3IntegrationRepresentationsFilePropertiesRepresentation()
  56. file_properties.name = filename
  57. file_properties.mimetype = 'csv'
  58. file_properties.filecontent = filecontents
  59. filearray = [file_properties]
  60. ##create document object and assign values, including file properties
  61. document = \
  62. assetic.Assetic3IntegrationRepresentationsDocumentRepresentation()
  63. document.label = 'Data Exchange - ' + filename
  64. document.file_extension = 'csv'
  65. document.document_type = 'DataExchange'
  66. document.mime_type = 'csv'
  67. document.doc_group_id = 1
  68. document.file_property = filearray
  69. #Upload document and get ID from response
  70. doc = docapi.document_post(document)
  71. docid = doc[0].get('Id')
  72. #get dataexchange profile for category
  73. profileid = profiles.get(category)
  74. ##prepare data exchange parameters
  75. job = assetic.Assetic3IntegrationRepresentationsDataExchangeJobRepresentation()
  76. job.profile_id = profileid
  77. job.document_id = docid
  78. asseticsdk.logger.info("Profile ID:{0}".format(job.profile_id))
  79. asseticsdk.logger.info("Job Document ID:{0}".format(job.document_id))
  80. try:
  81. task = dataexchangeapi.data_exchange_job_post(job)
  82. #move file to in-progress, and rename to task id to allow checks
  83. #on success/failure
  84. destination = inprogress + task + '.csv'
  85. shutil.move(fullfilename,destination)
  86. except assetic.rest.ApiException as e:
  87. #gross error - move to error folder??
  88. asseticsdk.logger.error('Status {0}, Reason: {1}'.format(
  89. e.status,e.reason))
  90. destination = errordir + task + '.csv'
  91. shutil.move(fullfilename,destination)

How it works

The folder locations are defined in the script.  It assumes that an external process is populating the source folder with files that are to be uploaded.

##File Processing Directory structure intdir =  "C:/temp/Integration/" sourcedir = intdir +  "ToLoad/" inprogress = intdir +  "InProgress/" errordir = intdir +  "Error/" processeddir = intdir +  "Processed/"

Each file in the source folder will be processed one at a time

files = os.listdir(sourcedir)  for filename in files:

 In this example each Asset Category will have a separate Date Exchange profile, so the profile ID for each category is defined

##List of categories and associated dataexchange profile profiles =  {'Sewer Nodes':  'c7e6d3ed-3917-e611-9458-06edd62954d7',  'Roads':  '876a0a4e-c732-e611-945f-06edd62954d7',  'Water Pressure Pipes':  '3abd431c-2017-e611-9812-0632cf3be881'}

 The filename is parsed to determine the category and hence the profile ID

  #get category, assumes file name has a whitespace between date and category     filedate, categorywithext = filename.split(None,1)     category, fileext = os.path.splitext(categorywithext)

 Get the dataexchange profile for the category

    profileid = profiles.get(category)

 The document is uploaded and the document ID obtained

  #Upload document and get ID from response     doc = docapi.document_post(document)     docid = doc[0].get('Id')

 

After preparing the job information the job is posted to Data Exchange.  If there is an exception returned it means there was a problem with creating the job.  Perhaps the profile ID is incorrect or there are insufficient priviledges to create the job.

If there is no exception the Data Exchange task ID is returned.  Since tasks are queued it may not run immediately.  To allow the task to be checked at a later stage the file is renamed to the task ID and moved to the 'inprogress' folder. 

  try:         task = dataexchangeapi.data_exchange_job_post(job)  #move file to in-progress, and rename to task id to allow checks  #on success/failure         destination = inprogress + task +  '.csv'         shutil.move(fullfilename,destination)  except assetic.rest.ApiException  as e:  #gross error - move to error folder         destination = errordir + task +  '.csv'         shutil.move(fullfilename,destination)         asseticsdk.logger.error('Status {0}, Reason: {1}'.format(e.status,e.reason))

Checking if a task has processed

The following Python script uses the file name to check to see if the data exchange task has run.  If it has run successfully then the file is moved to the 'Processed' folder, otherwise it is moved to the 'Error' folder.  This script is scheduled to run periodically.

 

  1. """
  2. Example script to automate data exchange (Assetic.DataExchangeCheckTask.py)
  3. Second of two scripts
  4. This script checks the 'InProcess' folder to see if task has beeen
  5. processed in data exchange. The names of the files in this folder have been
  6. changed from the original name to the data exchange task id
  7. If a task has been processed then it moves the file to either the
  8. 'Processed' folder or the 'Error' folder. If there is an error
  9. the error file generated by data exchange is also downloaded and
  10. placed in the error folder
  11. """
  12. import assetic
  13. import os
  14. import shutil
  15. import sys
  16. ##Assetic SDK instance
  17. asseticsdk = assetic.AsseticSDK("c:/users/you/assetic.ini",None,"Info")
  18. ##create an instance of the DataExchange API
  19. dataexchangeapi = assetic.DataExchangeTaskApi()
  20. ##Create an instance for document get.
  21. ##Use sdk client for document get. Returns header 'content-disposition'
  22. ##The header has the file name of the doc
  23. #Document API
  24. docapi = assetic.DocumentApi(asseticsdk.client_for_docs)
  25. # Define file path for output report files
  26. intdir = "C:/temp/Integration/"
  27. sourcedir = intdir + "ToLoad/"
  28. inprogressdir = intdir + "InProgress/"
  29. errordir = intdir + "Error/"
  30. processeddir = intdir + "Processed/"
  31. ##define function for getting error file
  32. def getfile(docid,docapi,filedir):
  33. try:
  34. getfile = docapi.document_get_document_file(docid)
  35. except assetic.rest.ApiException as e:
  36. asseticsdk.logger.error('Status {0}, Reason: {1}'.format(
  37. e.status,e.reason))
  38. if getfile != None:
  39. if 'attachment' in getfile[1] and 'filename=' in getfile[1]:
  40. filename = getfile[1].split('filename=',1)[1]
  41. if '"' in filename or "'" in filename:
  42. filename = filename[1:-1]
  43. fullfilename = filedir + filename
  44. else:
  45. fullfilename = filedir +filename
  46. data = getfile[0]
  47. if sys.version_info >= (3,0):
  48. try:
  49. data = data.encode('utf-8')
  50. except:
  51. data = data
  52. with open( fullfilename, 'wb' ) as out_file:
  53. out_file.write(data)
  54. asseticsdk.logger.info('Created file: {0}'.format(fullfilename))
  55. ##end function
  56. files = os.listdir(inprogressdir)
  57. for filename in files:
  58. fullfilename = inprogressdir + filename
  59. taskguid, fileext = os.path.splitext(filename)
  60. try:
  61. task = dataexchangeapi.data_exchange_task_get(taskguid)
  62. except assetic.rest.ApiException as e:
  63. asseticsdk.logger.error('Status {0}, Reason: {1}'.format(
  64. e.status,e.reason))
  65. else:
  66. status = task.get('StatusDescription')
  67. if status == 'Complete with error':
  68. #move file to error
  69. destination = errordir + filename
  70. shutil.move(fullfilename,destination)
  71. ##also get error file
  72. errordocid = task.get('ErrorDocumentId')
  73. errordesc = task.get('Summary')
  74. if errordocid == None:
  75. asseticsdk.logger.info("Error Description: {0}".format(
  76. errordesc))
  77. else:
  78. asseticsdk.logger.info("Error document: {0}".format(errordocid))
  79. errordirtask = errordir + taskguid
  80. getfile(errordocid,docapi,errordirtask)
  81. elif status == 'Completed':
  82. #move file to Completed
  83. destination = processeddir + filename
  84. shutil.move(fullfilename,destination)

How it works

The script loops through files in the inprogress folder and obtains the Data Exchange taskid using the file name of the file.  This script is schedule to run periodically.

files = os.listdir(inprogressdir)  for filename in files:     fullfilename = inprogressdir + filename     taskguid, fileext = os.path.splitext(filename)

The task is requested and status of the response checked

  try:         task = dataexchangeapi.data_exchange_task_get(taskguid)  except assetic.rest.ApiException  as e:         asseticsdk.logger.error('Status {0}, Reason: {1}'.format(e.status,e.reason))  else:         status = task.get('StatusDescription')

If the status is 'Complete with Error' the file is moved to the error folder.  If there error is due to an error with field mapping the the 'Summary' describes the problem.

If there is a data error then an error file is generated by Data Exchange.  This file is downloaded and saved in the 'error' folder along with the document.  The function 'getfile' is used to get the file. 

#move file to error destination = errordir + filename shutil.move(fullfilename,destination)  ##also get error file errordocid = task.get('ErrorDocumentId') errordesc = task.get('Summary')  if errordocid ==  None:    asseticsdk.logger.info(errordesc)  else:    asseticsdk.logger.info(errordocid)    errordirtask = errordir + taskguid    getfile(errordocid,docapi,errordirtask)

The function 'getfile' is defined in the script body.  Defining functions within the script can make it easier to read the script and perform repetitive tasks

##define function for getting error file  def getfile(docid,docapi,filedir):     getfile = docapi.document_get_document_file(docid)  if getfile !=  None:  if  'attachment'  in getfile[1]  and  'filename='  in getfile[1]:             filename = getfile[1].split('filename=',1)[1]  if  '"'  in filename or  "'"  in filename:                 filename = filename[1:-1]             fullfilename = filedir + filename         else:             fullfilename = filedir +filename           data = getfile[0]  if sys.version_info >=  (3,0):  try:                 data = data.encode('utf-8')  except:                 data = data         with open( fullfilename,  'wb'  )  as out_file:             out_file.write(data)             asseticsdk.logger.error ('Created file: {0}'.format(fullfilename))  ##end function

If the status is 'completed' it means the Data Exchange task has completed without error.  The file is moved to the completed folder and no further processing on that file takes place.

elif status ==  'Completed':  #move file to Completed    destination = processeddir + filename    shutil.move(fullfilename,destination)

If the task has not been processed then the file remains in the 'inprogress' folder.